package com.fintech.pangu.elasticsearch.autoconfigure;

import com.fintech.pangu.elasticsearch.service.BulkProcessorService;
import com.fintech.pangu.elasticsearch.service.impl.BulkProcessorServiceImpl;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * ElasticSearch批量处理服务配置加载
 *
 * @author xujunqi
 * @since 1.0.0
 */
@ConditionalOnProperty(prefix = "pangu.elasticsearch.bulkProcessor", havingValue = "true", name = "enable")
@EnableConfigurationProperties(BulkProcessorProperties.class)
@Configuration
public class BulkProcessorConfiguration {
    private static final Logger logger = LoggerFactory.getLogger(BulkProcessorConfiguration.class);

    @ConditionalOnBean({TransportClient.class})
    @ConditionalOnMissingBean({BulkProcessor.class})
    @Bean(name = "bulkProcessor", destroyMethod = "close")
    public BulkProcessor setBulkProcessor(TransportClient transportClient, BulkProcessorProperties bulkProcessorProperties) {
        int bulkActions = bulkProcessorProperties.getBulkActions();
        int bulkSize = bulkProcessorProperties.getBulkSize();
        int flushInterval = bulkProcessorProperties.getFlushInterval();
        int concurrentRequests = bulkProcessorProperties.getConcurrentRequests();
        int initialDelayTime = bulkProcessorProperties.getBackoffPolicy().getInitialDelayTime();
        int maxNumberOfRetries = bulkProcessorProperties.getBackoffPolicy().getMaxNumberOfRetries();

        //创建BulkPorcessor对象
        BulkProcessor bulkProcessor = BulkProcessor.builder(transportClient, new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId, BulkRequest bulkRequest) {
                //调用bulk之前执行 ，通过request.numberOfActions()方法知道numberOfActions
                int numberOfActions = bulkRequest.numberOfActions();
                logger.info("调用bulk之前:executionId=[{}],numberOfActions=[{}]", executionId, numberOfActions);
            }

            @Override
            public void afterBulk(long executionId, BulkRequest bulkRequest, BulkResponse bulkResponse) {
                //调用bulk之后执行 ，通过request.hasFailures()方法知道是否执行失败
                int numberOfActions = bulkRequest.numberOfActions();
                boolean hasFailures = bulkResponse.hasFailures();
                String buildFailureMessage = bulkResponse.buildFailureMessage();
                logger.info("调用bulk之后:executionId=[{}],hasFailures=[{}],numberOfActions=[{}],buildFailureMessage=[{}]", executionId, hasFailures, numberOfActions, buildFailureMessage);
            }

            @Override
            public void afterBulk(long executionId, BulkRequest bulkRequest, Throwable throwable) {
                //调用失败抛Throwable
                logger.error("执行executionId=[{}]批量更新异常:", executionId, throwable);
            }
            //每添加多少个request，执行一次bulk操作
        }).setBulkActions(bulkActions)
                //每达到多少MB的请求size时，执行一次bulk操作
                .setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB))
                //每隔多少秒执行一次bulk操作
                .setFlushInterval(TimeValue.timeValueSeconds(flushInterval))
                //设置并发请求的数量。默认是1，表示积累bulk requests和发送bulk是异步的，其数值表示发送bulk的并发线程数，设置为0表示二者同步的。
                .setConcurrentRequests(concurrentRequests)
                .setBackoffPolicy(
                        //设置自定义重复请求机制，最开始等待100毫秒，之后成倍更加，重试3次，当一次或多次重复请求失败后因为计算资源不够抛出 EsRejectedExecutionException 异常，可以通过BackoffPolicy.noBackoff()方法关闭重试机制
                        BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(initialDelayTime), maxNumberOfRetries))
                .build();
        return bulkProcessor;
    }

    @Bean(name = "bulkProcessorService")
    public BulkProcessorService bulkProcessorService(BulkProcessor bulkProcessor) {
        return new BulkProcessorServiceImpl(bulkProcessor);
    }
}
